import os
import string
import subprocess
import sys
import json
from typing import Text, List, Tuple, Dict, Set, NoReturn
from urllib.parse import urlsplit
from jmeter_api.non_test_elements.test_plan.elements import TestPlan
from jmeter_api.configs.user_defined_variables.elements import UserDefineVariables, Argument
from jmeter_api.configs.http_cookie_manager.elements import HTTPCookieManager, Cookie
from jmeter_api.configs.http_header_manager.elements import HTTPHeaderManager, Header
from jmeter_api.samplers.http_request.elements import HttpRequest, FileUpload, Method, Protocol
from jmeter_api.controllers.simple_controller.elements import SimpleController
from jmeter_api.post_processors.json_extractor.elements import JSONExtractor
from jmeter_api.assertions.response.elements import ResponseAssertion, TestField, TestType
from jmeter_api.assertions.json.elements import JSONAssertion
from jmeter_api.thread_groups.common_thread_group.elements import CommonThreadGroup

from loguru import logger
from sentry_sdk import capture_exception

from httprunner2jmeter import exceptions, __version__
from httprunner2jmeter.compat import (
    ensure_testcase_v3_api,
    ensure_testcase_v3,
    convert_variables,
    ensure_path_sep,
)
from httprunner2jmeter.loader import (
    load_folder_files,
    load_test_file,
    load_testcase,
    load_testsuite,
    load_project_meta,
    convert_relative_project_root_dir,
)
from httprunner2jmeter.response import uniform_validator
from httprunner2jmeter.utils import merge_variables, is_support_multiprocessing

""" cache converted pytest files, avoid duplicate making
"""
pytest_files_made_cache_mapping: Dict[Text, Text] = {}

""" save generated pytest files to run, except referenced testcase
"""
pytest_files_run_set: Set = set()


def __ensure_absolute(path: Text) -> Text:
    if path.startswith("./"):
        # Linux/Darwin, hrun ./test.yml
        path = path[len("./") :]
    elif path.startswith(".\\"):
        # Windows, hrun .\\test.yml
        path = path[len(".\\") :]

    path = ensure_path_sep(path)
    project_meta = load_project_meta(path)

    if os.path.isabs(path):
        absolute_path = path
    else:
        absolute_path = os.path.join(project_meta.RootDir, path)

    if not os.path.isfile(absolute_path):
        logger.error(f"Invalid testcase file path: {absolute_path}")
        sys.exit(1)

    return absolute_path


def ensure_file_abs_path_valid(file_abs_path: Text) -> Text:
    """ ensure file path valid for pytest, handle cases when directory name includes dot/hyphen/space

    Args:
        file_abs_path: absolute file path

    Returns:
        ensured valid absolute file path

    """
    project_meta = load_project_meta(file_abs_path)
    raw_abs_file_name, file_suffix = os.path.splitext(file_abs_path)
    file_suffix = file_suffix.lower()

    raw_file_relative_name = convert_relative_project_root_dir(raw_abs_file_name)
    if raw_file_relative_name == "":
        return file_abs_path

    path_names = []
    for name in raw_file_relative_name.rstrip(os.sep).split(os.sep):

        if name[0] in string.digits:
            # ensure file name not startswith digit
            # 19 => T19, 2C => T2C
            name = f"T{name}"

        if name.startswith("."):
            # avoid ".csv" been converted to "_csv"
            pass
        else:
            # handle cases when directory name includes dot/hyphen/space
            name = name.replace(" ", "_").replace(".", "_").replace("-", "_")

        path_names.append(name)

    new_file_path = os.path.join(
        project_meta.RootDir, f"{os.sep.join(path_names)}{file_suffix}"
    )
    return new_file_path


def __ensure_testcase_module(path: Text) -> NoReturn:
    """ ensure pytest files are in python module, generate __init__.py on demand
    """
    init_file = os.path.join(os.path.dirname(path), "__init__.py")
    if os.path.isfile(init_file):
        return

    with open(init_file, "w", encoding="utf-8") as f:
        f.write("# NOTICE: Generated By HttpRunner. DO NOT EDIT!\n")


def convert_testcase_path(testcase_abs_path: Text) -> Tuple[Text, Text]:
    """convert single YAML/JSON testcase path to python file"""
    testcase_new_path = ensure_file_abs_path_valid(testcase_abs_path)

    dir_path = os.path.dirname(testcase_new_path)
    file_name, _ = os.path.splitext(os.path.basename(testcase_new_path))
    testcase_python_abs_path = os.path.join(dir_path, f"{file_name}_test.jmx")

    # convert title case, e.g. request_with_variables => RequestWithVariables
    name_in_title_case = file_name.title().replace("_", "")

    return testcase_python_abs_path, name_in_title_case


def format_pytest_with_black(*python_paths: Text) -> NoReturn:
    logger.info("format pytest cases with black ...")
    try:
        if is_support_multiprocessing() or len(python_paths) <= 1:
            subprocess.run(["black", *python_paths])
        else:
            logger.warning(
                f"this system does not support multiprocessing well, format files one by one ..."
            )
            [subprocess.run(["black", path]) for path in python_paths]
    except subprocess.CalledProcessError as ex:
        capture_exception(ex)
        logger.error(ex)
        sys.exit(1)
    except FileNotFoundError:
        err_msg = """
missing dependency tool: black
install black manually and try again:
$ pip install black
"""
        logger.error(err_msg)
        sys.exit(1)


def make_plan_config(config: Dict, plan_config: List = [], jmeter_variables: List = []) -> Text:
    if ("base_url" in config) and ('base_url' not in jmeter_variables):
        net_url = urlsplit(config["base_url"])
        plan_config.append(Argument(name='base_url', value=net_url.hostname or ''))
        jmeter_variables.append('base_url')
    if config["variables"]:
        variables = config["variables"]
        if isinstance(variables, dict):
            for k,v in variables.items():
                if k in jmeter_variables: continue
                if k == "base_url":
                    net_url = urlsplit(v)
                    v = net_url.hostname
                plan_config.append(Argument(name=k, value=str(v)))
                jmeter_variables.append(k)
    return plan_config, jmeter_variables


def make_plan_request(jmeter_request: HttpRequest, request: Dict, jmeter_variables: List = []) -> Text:
    method = request["method"].upper()
    url = request["url"]
    jmeter_request.name = jmeter_request.path = url
    jmeter_request.method = Method(method)
    if "headers" in request:
        headers = request["headers"]
        if isinstance(headers, dict):
            headers_config = []
            for k,v in headers.items():
                for jv in jmeter_variables:
                    v = str(v).replace('${}'.format(jv), '${{{}}}'.format(jv))
                headers_config.append(Header(name=k, value=str(v)))
            jmeter_request.append(HTTPHeaderManager(headers=headers_config))

    if "cookies" in request:
        cookies = request["cookies"]
        if isinstance(cookies, dict):
            cookies_config = []
            for k,v in cookies.items():
                for jv in jmeter_variables:
                    v = str(v).replace('${}'.format(jv), '${{{}}}'.format(jv))
                cookies_config.append(Cookie(name=k, value=str(v)))
            jmeter_request.append(HTTPCookieManager(cookies=cookies_config))

    if "data" in request:
        data = request["data"]
        if not isinstance(data, str): data = json.dumps(data)
        for jv in jmeter_variables:
            data = data.replace('${}'.format(jv), '${{{}}}'.format(jv))
        jmeter_request.add_body_data(data)

    if "json" in request:
        req_json = request["json"]
        if not isinstance(req_json, str): req_json = json.dumps(req_json)
        for jv in jmeter_variables:
            req_json = req_json.replace('${}'.format(jv), '${{{}}}'.format(jv))
        jmeter_request.add_body_data(req_json)

    if "timeout" in request:
        timeout = request["timeout"]
        jmeter_request.connect_timeout = jmeter_request.response_timeout = timeout

    if "allow_redirects" in request:
        allow_redirects = request["allow_redirects"]
        jmeter_request.auto_redirect = allow_redirects

    if "upload" in request:
        upload = request["upload"]
        if isinstance(upload, dict):
            for k in upload:
                for jv in jmeter_variables:
                    upload[k] = str(upload[k]).replace('${}'.format(jv), '${{{}}}'.format(jv))
            jmeter_request.add_file_upload(FileUpload(file_path=upload.get('file', ''), param_name=upload.get('name', ''), mime_type=upload.get('type', '')))

    return jmeter_request


def make_plan_teststep(teststep: Dict, jmeter_config = {}, jmeter_variables: List = []) -> Text:
    if not (teststep.get("request") or teststep.get("testcase")):
        raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")

    if teststep.get("request"):
        net_url = urlsplit(jmeter_config.get("base_url"))
        jmeter_request = HttpRequest(host='${base_url}', protocol=Protocol(net_url.scheme.lower() or 'https'), port=net_url.port)
        if "variables" in teststep:
            config = []
            variables = teststep["variables"]
            if isinstance(variables, dict):
                for k,v in variables.items():
                    config.append(Argument(name=k, value=str(v)))        
                    if k not in jmeter_variables: jmeter_variables.append(k)
            jmeter_request.append(UserDefineVariables(arguments=config))
        jmeter_request = make_plan_request(jmeter_request, teststep["request"], jmeter_variables)
    elif teststep.get("testcase"):
        # make ref testcase pytest file
        ref_testcase_path = __ensure_absolute(teststep["testcase"])
        test_content = load_test_file(ref_testcase_path)

        if not isinstance(test_content, Dict):
            raise exceptions.TestCaseFormatError(f"Invalid teststep: {teststep}")

        # api in v2 format, convert to v3 testcase
        if "request" in test_content and "name" in test_content:
            test_content = ensure_testcase_v3_api(test_content)
        
        config = test_content.get('config', {})
        jmeter_simple = SimpleController(name=config.get('name'))
        jmeter_config, jmeter_variables = make_plan_config(config=config, plan_config=[], jmeter_variables=jmeter_variables)
        jmeter_simple.append(UserDefineVariables(arguments=jmeter_config))
        for step in test_content.get('teststeps', []):
            jmeter_group, jmeter_variables = make_plan_teststep(teststep=step, jmeter_config=config, jmeter_variables=jmeter_variables)
            jmeter_simple.append(jmeter_group)
        return jmeter_simple, jmeter_variables

    if "extract" in teststep:
        # request step
        for extract_name, extract_path in teststep["extract"].items():
            if extract_path.startswith('body'):
                extract_path = '${}'.format(extract_path[4:])
                jmeter_request.append(JSONExtractor(name=extract_name, referenceNames=extract_name, jsonPathExprs=extract_path))
                if extract_name not in jmeter_variables: jmeter_variables.append(extract_name)

    if "validate" in teststep:
        for v in teststep["validate"]:
            validator = uniform_validator(v)
            check = validator["check"]
            if check.startswith('body'):
                check = '${}'.format(check[4:])
                expect = validator["expect"]
                if not isinstance(expect, Text): expect = str(expect)
                jmeter_request.append(JSONAssertion(json_path=check, expected_value=expect))
            elif check == 'status_code':
                expect = validator["expect"]
                if not isinstance(expect, Text): expect = str(expect)
                jmeter_request.append(ResponseAssertion(test_field=TestField.RESPONSE_CODE, test_type=TestType.EQUALS, patterns=[expect]))
    return jmeter_request, jmeter_variables


def make_testcase(testcase: Dict, dir_path: Text = None, jmeter_plan: TestPlan = None, jmeter_variables: List = []) -> Text:
    """convert valid testcase dict to pytest file path"""
    # ensure compatibility with testcase format v2
    testcase = ensure_testcase_v3(testcase)

    # validate testcase format
    load_testcase(testcase)

    testcase_abs_path = __ensure_absolute(testcase["config"]["path"])
    logger.info(f"start to make testcase: {testcase_abs_path}")

    testcase_python_abs_path, testcase_cls_name = convert_testcase_path(
        testcase_abs_path
    )
    if dir_path:
        testcase_python_abs_path = os.path.join(
            dir_path, os.path.basename(testcase_python_abs_path)
        )

    global pytest_files_made_cache_mapping
    if testcase_python_abs_path in pytest_files_made_cache_mapping:
        return testcase_python_abs_path

    config = testcase["config"]
    config["path"] = convert_relative_project_root_dir(testcase_python_abs_path)
    config["variables"] = convert_variables(
        config.get("variables", {}), testcase_abs_path
    )
    
    create_plan = False
    if jmeter_plan is None: 
        jmeter_plan = TestPlan(name=config.get('name'))
        create_plan = True
    # prepare reference testcase
    teststeps = testcase["teststeps"]

    # current file compared to ProjectRootDir

    jmeter_config, jmeter_variables = make_plan_config(config=config, plan_config=[], jmeter_variables=jmeter_variables)
    thread_group = CommonThreadGroup(name=config.get('name'), num_threads=1, ramp_time=1, loops=1)
    if create_plan: 
        jmeter_plan.append(UserDefineVariables(arguments=jmeter_config))
    else:
        thread_group.append(UserDefineVariables(arguments=jmeter_config))
    for step in teststeps:
        jmeter_group, jmeter_variables = make_plan_teststep(teststep=step, jmeter_config=config, jmeter_variables=jmeter_variables)
        thread_group.append(jmeter_group)
    jmeter_plan.append(thread_group)
    if create_plan:
        content = jmeter_plan.to_xml()

        # create directory with testsuite file name, put its testcases under this directory
        testsuite_dir, _ = os.path.splitext(testcase_abs_path)
        # demo_testsuite.yml => demo_testsuite_yml
        testcase_python_abs_path = f"{testsuite_dir}.jmx"
        # ensure new file's directory exists
        with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
            f.write(content)


        logger.info(f"generated testcase: {testcase_python_abs_path}")

    return jmeter_plan


def make_testsuite(testsuite: Dict) -> NoReturn:
    """convert valid testsuite dict to pytest folder with testcases"""
    # validate testsuite format
    load_testsuite(testsuite)

    testsuite_config = testsuite["config"]
    testsuite_path = testsuite_config["path"]
    testsuite_variables = convert_variables(
        testsuite_config.get("variables", {}), testsuite_path
    )

    logger.info(f"start to make testsuite: {testsuite_path}")

    jmeter_plan = TestPlan(name=testsuite_config.get('name'))

    jmeter_config, jmeter_variables = make_plan_config(config=testsuite_config)
    jmeter_plan.append(UserDefineVariables(arguments=jmeter_config))
    for testcase in testsuite["testcases"]:
        # get referenced testcase content
        testcase_file = testcase["testcase"]
        testcase_path = __ensure_absolute(testcase_file)
        testcase_dict = load_test_file(testcase_path)
        testcase_dict.setdefault("config", {})
        testcase_dict["config"]["path"] = testcase_path

        # override testcase name
        testcase_dict["config"]["name"] = testcase["name"]
        # override base_url
        base_url = testsuite_config.get("base_url") or testcase.get("base_url")
        if base_url:
            testcase_dict["config"]["base_url"] = base_url
        # override verify
        if "verify" in testsuite_config:
            testcase_dict["config"]["verify"] = testsuite_config["verify"]
        # override variables
        # testsuite testcase variables > testsuite config variables
        testcase_variables = convert_variables(
            testcase.get("variables", {}), testcase_path
        )
        # testcase_variables = merge_variables(testcase_variables, testsuite_variables)
        # testsuite testcase variables > testcase config variables
        testcase_dict["config"]["variables"] = convert_variables(
            testcase_dict["config"].get("variables", {}), testcase_path
        )
        testcase_dict["config"]["variables"].update(testcase_variables)

        # override weight
        if "weight" in testcase:
            testcase_dict["config"]["weight"] = testcase["weight"]

        # override order
        if "order" in testcase:
            try:
                if not isinstance(testcase.get('order'), int) and testcase.get('order') not in ('first', 'second', 'last', 'second_to_last'):
                    raise exceptions.TestCaseFormatError('order is must be an number, or choise in "first", "second", "last" and "second_to_last"')
                testcase_dict["config"]["order"] = testcase["order"]
            except exceptions.TestCaseFormatError as ex:
                logger.warning(
                    f"Invalid order value: : {ex}"
                )
                break

        # make testcase
        jmeter_plan = make_testcase(testcase_dict, jmeter_plan=jmeter_plan, jmeter_variables=jmeter_variables)

    content = jmeter_plan.to_xml()

    # create directory with testsuite file name, put its testcases under this directory
    testsuite_path = ensure_file_abs_path_valid(testsuite_path)
    testsuite_dir, _ = os.path.splitext(testsuite_path)
    # demo_testsuite.yml => demo_testsuite_yml
    testcase_python_abs_path = f"{testsuite_dir}.jmx"
    # ensure new file's directory exists
    with open(testcase_python_abs_path, "w", encoding="utf-8") as f:
        f.write(content)


    logger.info(f"generated testcase: {testcase_python_abs_path}")


def __make(tests_path: Text) -> NoReturn:
    """ make testcase(s) with testcase/testsuite/folder absolute path
        generated pytest file path will be cached in pytest_files_made_cache_mapping

    Args:
        tests_path: should be in absolute path

    """
    logger.info(f"make path: {tests_path}")
    test_files = []
    if os.path.isdir(tests_path):
        files_list = load_folder_files(tests_path)
        test_files.extend(files_list)
    elif os.path.isfile(tests_path):
        test_files.append(tests_path)
    else:
        raise exceptions.TestcaseNotFound(f"Invalid tests path: {tests_path}")

    for test_file in test_files:
        if test_file.lower().endswith("_test.jmx"):
            pytest_files_run_set.add(test_file)
            continue

        try:
            test_content = load_test_file(test_file)
        except (exceptions.FileNotFound, exceptions.FileFormatError) as ex:
            logger.warning(f"Invalid test file: {test_file}\n{type(ex).__name__}: {ex}")
            continue

        if not isinstance(test_content, Dict):
            logger.warning(
                f"Invalid test file: {test_file}\n"
                f"reason: test content not in dict format."
            )
            continue

        # api in v2 format, convert to v3 testcase
        if "request" in test_content and "name" in test_content:
            test_content = ensure_testcase_v3_api(test_content)

        if "config" not in test_content:
            logger.warning(
                f"Invalid testcase/testsuite file: {test_file}\n"
                f"reason: missing config part."
            )
            continue
        elif not isinstance(test_content["config"], Dict):
            logger.warning(
                f"Invalid testcase/testsuite file: {test_file}\n"
                f"reason: config should be dict type, got {test_content['config']}"
            )
            continue

        # ensure path absolute
        test_content.setdefault("config", {})["path"] = test_file

        # testcase
        if "teststeps" in test_content:
            try:
                make_testcase(test_content)
            except exceptions.TestCaseFormatError as ex:
                logger.warning(
                    f"Invalid testcase file: {test_file}\n{type(ex).__name__}: {ex}"
                )
                continue

        # testsuite
        elif "testcases" in test_content:
            try:
                make_testsuite(test_content)
            except exceptions.TestSuiteFormatError as ex:
                logger.warning(
                    f"Invalid testsuite file: {test_file}\n{type(ex).__name__}: {ex}"
                )
                continue

        # invalid format
        else:
            logger.warning(
                f"Invalid test file: {test_file}\n"
                f"reason: file content is neither testcase nor testsuite"
            )


def main_make(tests_paths: List[Text]) -> List[Text]:
    if not tests_paths:
        return []

    for tests_path in tests_paths:
        tests_path = ensure_path_sep(tests_path)
        if not os.path.isabs(tests_path):
            tests_path = os.path.join(os.getcwd(), tests_path)

        try:
            __make(tests_path)
        except exceptions.MyBaseError as ex:
            logger.error(ex)
            sys.exit(1)

    return list(pytest_files_run_set)

